{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Building and Deploying our ML Model\n", "\n", "**SageMaker Studio Kernel**: Data Science\n", "\n", "In this exercise you will do:\n", " - Create/Run a Model Building Pipeline using Pytorch and [SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html)\n", " - Compute the thresholds, used by the applicatio to classify the predictions as anomalies or normal behavior\n", " - Compile/Optimize your model to your edge device (Linux X86_64) using [SageMaker NEO](https://docs.aws.amazon.com/sagemaker/latest/dg/neo.html)\n", " - Create a deployment package with a signed model + the runtime used by SageMaker Edge Agent to load and invoke the optimized model\n", " - Deploy the package using IoT Jobs\n", "\n", "The following diagram shows all the steps we're going to execute: \n", "![Pipeline](../imgs/EdgeManagerWorkshop_ModelPipeline.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Part 1/4 - Setup\n", "Here we'll import some libraries and define some variables. You can also take a look on the scripts that were previously created for preparing the data and training our model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "import numpy as np\n", "import sagemaker\n", "import glob\n", "import os\n", "import boto3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "project_name='<>'\n", "\n", "s3_client = boto3.client('s3')\n", "sm_client = boto3.client('sagemaker')\n", "\n", "project_id = sm_client.describe_project(ProjectName=project_name)['ProjectId']\n", "bucket_name = 'sagemaker-wind-turbine-farm-%s' % project_id\n", "\n", "prefix='wind_turbine_anomaly'\n", "sagemaker_session=sagemaker.Session(default_bucket=bucket_name)\n", "role = sagemaker.get_execution_role()\n", "print('Project name: %s' % project_name)\n", "print('Project id: %s' % project_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get the dataset and upload it to an S3 bucket\n", "This bucket will be the input path of the data prep step of our ML Pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Download the \n", "!mkdir -p data\n", "!curl https://aws-ml-blog.s3.amazonaws.com/artifacts/monitor-manage-anomaly-detection-model-wind-turbine-fleet-sagemaker-neo/dataset_wind_turbine.csv.gz -o data/dataset_wind.csv.gz\n", "# clean the buckets first\n", "s3_client.delete_object(Bucket=bucket_name, Key='%s/data/' % prefix)\n", "s3_client.delete_object(Bucket=bucket_name, Key='wind_turbine_anomaly/output')\n", "\n", "input_data = sagemaker_session.upload_data('data/dataset_wind.csv.gz', key_prefix=\"%s/data\" % prefix )\n", "print(input_data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Visualize the training script & the preprocessing script" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## This script was created to express what we saw in the previous exercise.\n", "## It will get the raw data from the turbine sensors, select some features, \n", "## denoise, normalize, encode and reshape it as a 6x10x10 tensor\n", "## This script is the entrypoint of the first step of the ML Pipelie: Data preparation\n", "!pygmentize preprocessing.py" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## This is the training/prediction script, used by the training step of \n", "## our ML Pipeline. In this step, a SageMaker Training Job will run this \n", "## script to build the model. Then, in the batch transform step,\n", "## the same script will be used again to load the trained model\n", "## and rebuild (predict) all the training samples. These predictions\n", "## will then be used to compute MAE and the thresholds, for detecting anomalies\n", "!pygmentize wind_turbine.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating our ML Pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Input parameters of the pipeline\n", "These input parameters can be overriden later if you want. The final pipeline is like a function f(x), where you reuse many times to train/retrain your model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "from sagemaker.workflow.steps import CacheConfig\n", "from sagemaker.workflow.parameters import (\n", " ParameterInteger,\n", " ParameterString,\n", ")\n", "## By enabling cache, if you run this pipeline again, without changing the input \n", "## parameters it will skip the training part and reuse the previous trained model\n", "cache_config = CacheConfig(enable_caching=True, expire_after=\"30d\")\n", "ts = time.strftime('%Y-%m-%d-%H-%M-%S')\n", "\n", "# Data prep\n", "processing_instance_type = ParameterString( # instance type for data preparation\n", " name=\"ProcessingInstanceType\",\n", " default_value=\"ml.m5.xlarge\"\n", ")\n", "processing_instance_count = ParameterInteger( # number of instances used for data preparation\n", " name=\"ProcessingInstanceCount\",\n", " default_value=1\n", ")\n", "\n", "# Training\n", "training_instance_type = ParameterString( # instance type for training the model\n", " name=\"TrainingInstanceType\",\n", " default_value=\"ml.g4dn.xlarge\"\n", ")\n", "training_instance_count = ParameterInteger( # number of instances used to train your model\n", " name=\"TrainingInstanceCount\",\n", " default_value=1 # wind_turbine.py supports only 1 instance\n", ")\n", "\n", "# Batch prediction\n", "transform_instance_type = ParameterString( # instance type for batch transform jobs\n", " name=\"TransformInstanceType\",\n", " default_value=\"ml.c5.xlarge\"\n", ")\n", "transform_instance_count = ParameterInteger( # number of instances used for batch prediction\n", " name=\"TransformInstanceCount\",\n", " default_value=2\n", ")\n", "\n", "# Dataset input data: S3 path\n", "input_data = ParameterString(\n", " name=\"InputData\",\n", " default_value=input_data,\n", ")\n", "\n", "# Batch prediction output: S3 path\n", "output_batch_data = ParameterString(\n", " name=\"OutputBatchData\",\n", " default_value=\"s3://%s/%s/output\" % (bucket_name, prefix),\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Defining the steps of our pipeline\n", "\n", "### Step 1/5 - Preprocess the raw data to clean, denoise and normalize" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.workflow.steps import ProcessingStep\n", "\n", "sts_client = boto3.client('sts')\n", "account_id = sts_client.get_caller_identity().get('Account')\n", "region = boto3.session.Session().region_name\n", "\n", "script_processor = SKLearnProcessor(\n", " framework_version=\"0.23-1\",\n", " role=role,\n", " instance_type=processing_instance_type,\n", " instance_count=processing_instance_count,\n", " max_runtime_in_seconds=7200,\n", ")\n", "\n", "step_process = ProcessingStep(\n", " name=\"WindTurbineDataPreprocess\",\n", " code='preprocessing.py', ## this is the script defined above\n", " processor=script_processor,\n", " inputs=[\n", " ProcessingInput(source=input_data, destination='/opt/ml/processing/input')\n", " ],\n", " outputs=[\n", " ProcessingOutput(output_name='train_data', source='/opt/ml/processing/train'),\n", " ProcessingOutput(output_name='statistics', source='/opt/ml/processing/statistics')\n", " ],\n", " job_arguments=['--num-dataset-splits', '20']\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 2/5 - Training with a Pytorch Estimator" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### First we create the SageMaker Estimator" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.pytorch.estimator import PyTorch\n", "framework_version='1.6.0'\n", "py_version='py3'\n", "estimator = PyTorch(\n", " 'wind_turbine.py', ## This is the script (wind_turbine.py) defined above\n", " framework_version=framework_version,\n", " role=role,\n", " sagemaker_session=sagemaker_session,\n", " instance_type=training_instance_type,\n", " instance_count=training_instance_count,\n", " py_version=py_version, \n", " hyperparameters={\n", " 'k_fold_splits': 6,\n", " 'k_index_only': 3, # after running some experiments with this dataset, it makes sense to fix it\n", " 'num_epochs': 200,\n", " 'batch_size': 256,\n", " 'learning_rate': 0.0001,\n", " 'dropout_rate': 0.001\n", " },\n", " metric_definitions=[\n", " {'Name': 'train_loss:mse', 'Regex': ' train_loss=(\\S+);'},\n", " {'Name': 'test_loss:mse', 'Regex': ' test_loss=(\\S+);'}\n", " ]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Then, we define the training step" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.inputs import TrainingInput\n", "from sagemaker.workflow.steps import TrainingStep\n", "\n", "step_train = TrainingStep(\n", " name=\"WindTurbineAnomalyTrain\",\n", " estimator=estimator,\n", " inputs={\"train\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\"train_data\"].S3Output.S3Uri,\n", " content_type=\"application/x-npy\"\n", " )},\n", " cache_config=cache_config\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 4/5 - Create a new model in the SageMaker Models Catalog\n", "This step will transform the results of your Training Job into a real Model. After that, you'll be able to deploy and invoke your model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.steps import CreateModelStep\n", "from sagemaker.model import Model\n", "from sagemaker.inputs import CreateModelInput\n", "\n", "model = Model(\n", " image_uri=sagemaker.image_uris.retrieve(\n", " framework=\"pytorch\", # we are using the SageMaker pre-built PyTorch inference image\n", " region=sagemaker_session.boto_session.region_name,\n", " version=framework_version,\n", " py_version=py_version,\n", " instance_type=training_instance_type,\n", " image_scope='inference'\n", " ),\n", " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " sagemaker_session=sagemaker_session,\n", " role=role\n", ")\n", "step_create_model = CreateModelStep(\n", " name=\"WindTurbineAnomalyCreateModel\",\n", " model=model,\n", " inputs=CreateModelInput(\n", " instance_type=transform_instance_type\n", " )\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 5/5 - Run a batch transform job to get all the predictions\n", "The predictions will then be used to compute the Thresholds. Only the training samples will be used in this step" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.steps import TransformStep\n", "from sagemaker.inputs import TransformInput\n", "from sagemaker.transformer import Transformer\n", "\n", "step_transform = TransformStep(\n", " name=\"WindTurbineAnomalyTransform\",\n", " transformer=Transformer(\n", " model_name=step_create_model.properties.ModelName,\n", " instance_type=transform_instance_type,\n", " instance_count=transform_instance_count,\n", " output_path=output_batch_data,\n", " accept='application/x-npy',\n", " max_payload=20,\n", " strategy='MultiRecord',\n", " assemble_with='Line'\n", " ),\n", " inputs=TransformInput(data=step_process.properties.ProcessingOutputConfig.Outputs[\"train_data\"].S3Output.S3Uri, content_type=\"application/x-npy\")\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Now that we have created all the requireds steps, its time to create our pipeline\n", "This code will create a physical (with ARN) resource that will execute all the steps defined abov" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from botocore.exceptions import ClientError, ValidationError\n", "from sagemaker.workflow.pipeline import Pipeline\n", "\n", "# NOTE:\n", "# condition steps have issues in service so we go straight to step_register\n", "pipeline_name = \"WindTurbineAnomalyTrain-%s\" % ts\n", "pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " processing_instance_type,\n", " processing_instance_count, \n", " training_instance_type,\n", " training_instance_count, \n", " transform_instance_type,\n", " transform_instance_count, \n", " input_data,\n", " output_batch_data\n", " ],\n", " steps=[step_process, step_train, step_create_model, step_transform],\n", " sagemaker_session=sagemaker_session,\n", ")\n", "\n", "try:\n", " response = pipeline.create(role_arn=role)\n", "except ClientError as e:\n", " error = e.response[\"Error\"]\n", " if error[\"Code\"] == \"ValidationError\" and \"Pipeline names must be unique within\" in error[\"Message\"]:\n", " print(error[\"Message\"])\n", " response = pipeline.describe()\n", " else:\n", " raise\n", "\n", " ## The following code put some tags that will be tracked by SageMaker Studio\n", "pipeline_arn = response[\"PipelineArn\"]\n", "sm_client = boto3.client('sagemaker')\n", "sm_client.add_tags(\n", " ResourceArn=pipeline_arn,\n", " Tags=[\n", " {'Key': 'sagemaker:project-name', 'Value': project_name },\n", " {'Key': 'sagemaker:project-id', 'Value': project_id }\n", " ]\n", ")\n", "print(pipeline_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Part 2/4 - Now its time to execute our pipeline. After kicking-off the pipeline, you can open SageMaker Studio and go to your project -> Pipelines to see execution\n", "It takes ~17mins to complete the whole pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "start_response = pipeline.start(parameters={\n", " # 'TrainingInstanceType': 'ml.c5.xlarge', # uncomment this line if your account doesn't support g4 instances. It will take 20x more to finish.\n", " 'TransformInstanceType': 'ml.c5.xlarge'\n", "})\n", "\n", "pipeline_execution_arn = start_response.arn\n", "print(pipeline_execution_arn)\n", "\n", "while True:\n", " resp = sm_client.describe_pipeline_execution(PipelineExecutionArn=pipeline_execution_arn)\n", " if resp['PipelineExecutionStatus'] == 'Executing':\n", " print('Running...')\n", " else:\n", " print(resp['PipelineExecutionStatus'], pipeline_execution_arn)\n", " break\n", " time.sleep(30)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Part 3/4 - Compute the threshold based on MAE" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Download the predictions & Compute MAE/thresholds" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_session.download_data(bucket=bucket_name, key_prefix='wind_turbine_anomaly/output/', path='data/preds/')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import sagemaker\n", "\n", "sm_client = boto3.client('sagemaker')\n", "sagemaker_session = sagemaker.Session()\n", "role = sagemaker.get_execution_role()\n", "\n", "execution_id = pipeline_execution_arn.split('/')[-1]\n", "training_jobs = sm_client.list_training_jobs(NameContains=execution_id, StatusEquals='Completed')['TrainingJobSummaries']\n", "\n", "assert(len(training_jobs) == 1) # it must have exactly one training job\n", "training_job_name=training_jobs[0]['TrainingJobName']\n", "\n", "# We will recreate the estimator, based on the training job\n", "estimator = sagemaker.estimator.Estimator.attach(\n", " training_job_name=training_job_name, \n", " sagemaker_session=sagemaker_session\n", ")\n", "input_data = sm_client.describe_training_job(TrainingJobName=training_job_name)\n", "input_data = input_data['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri']\n", "\n", "tokens = input_data.split('/', 3)\n", "sagemaker_session.download_data(bucket=tokens[2], key_prefix=tokens[3], path='data/input/')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import glob\n", "\n", "x_inputs = np.vstack([np.load(i) for i in glob.glob('data/input/*.npy')])\n", "y_preds = np.vstack([np.load(i) for i in glob.glob('data/preds/*.out')])\n", "\n", "n_samples,n_features,n_rows,n_cols = x_inputs.shape\n", "\n", "x_inputs = x_inputs.reshape(n_samples, n_features, n_rows*n_cols).transpose((0,2,1))\n", "y_preds = y_preds.reshape(n_samples, n_features, n_rows*n_cols).transpose((0,2,1))\n", "\n", "mae_loss = np.mean(np.abs(y_preds - x_inputs), axis=1).transpose((1,0))\n", "mae_loss[np.isnan(mae_loss)] = 0\n", "\n", "thresholds = np.mean(mae_loss, axis=1)\n", "np.save('statistics/thresholds.npy', thresholds)\n", "print(\",\".join(thresholds.astype(str)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Part 4/4 - Compiling/Packaging/Deploying our ML model to our edge devices" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Invoking SageMaker NEO to compile the trained model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "compilation_job_name = 'wind-turbine-anomaly-%d' % int(time.time()*1000)\n", "sm_client.create_compilation_job(\n", " CompilationJobName=compilation_job_name,\n", " RoleArn=role,\n", " InputConfig={\n", " 'S3Uri': '%s%s/output/model.tar.gz' % (estimator.output_path, training_job_name),\n", " 'DataInputConfig': '{\"input0\":[1,%d,10,10]}' % n_features,\n", " 'Framework': 'PYTORCH'\n", " },\n", " OutputConfig={\n", " 'S3OutputLocation': 's3://%s/wind_turbine/optimized/' % sagemaker_session.default_bucket(), \n", " 'TargetPlatform': { 'Os': 'LINUX', 'Arch': 'X86_64' }\n", " },\n", " StoppingCondition={ 'MaxRuntimeInSeconds': 900 }\n", ")\n", "while True:\n", " resp = sm_client.describe_compilation_job(CompilationJobName=compilation_job_name) \n", " if resp['CompilationJobStatus'] in ['STARTING', 'INPROGRESS']:\n", " print('Running...')\n", " else:\n", " print(resp['CompilationJobStatus'], compilation_job_name)\n", " break\n", " time.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Building the Deployment Package SageMaker Edge Manager\n", "It will sign the model and create a deployment package with:\n", " - The optimized model\n", " - Model Metadata\n", " - SageMaker NEO runtime (dlr)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "model_version = '1.0'\n", "model_name = 'WindTurbineAnomalyDetection'\n", "edge_packaging_job_name='wind-turbine-anomaly-%d' % int(time.time()*1000)\n", "resp = sm_client.create_edge_packaging_job(\n", " EdgePackagingJobName=edge_packaging_job_name,\n", " CompilationJobName=compilation_job_name,\n", " ModelName=model_name,\n", " ModelVersion=model_version,\n", " RoleArn=role,\n", " OutputConfig={\n", " 'S3OutputLocation': 's3://%s/%s/model/' % (bucket_name, prefix)\n", " }\n", ")\n", "while True:\n", " resp = sm_client.describe_edge_packaging_job(EdgePackagingJobName=edge_packaging_job_name) \n", " if resp['EdgePackagingJobStatus'] in ['STARTING', 'INPROGRESS']:\n", " print('Running...')\n", " else:\n", " print(resp['EdgePackagingJobStatus'], compilation_job_name)\n", " break\n", " time.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Deploy the package\n", "Using IoT Jobs, we will notify the Python application in the edge devices. The application will:\n", " - download the deployment package\n", " - unpack it\n", " - load the new mode (unload previous versions if any)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import json\n", "import sagemaker\n", "import uuid\n", "\n", "iot_client = boto3.client('iot')\n", "sts_client = boto3.client('sts')\n", "\n", "model_version = '1.0'\n", "model_name = 'WindTurbineAnomalyDetection'\n", "sagemaker_session=sagemaker.Session()\n", "region_name = sagemaker_session.boto_session.region_name\n", "account_id = sts_client.get_caller_identity()[\"Account\"]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "resp = iot_client.create_job(\n", " jobId=str(uuid.uuid4()),\n", " targets=[\n", " 'arn:aws:iot:%s:%s:thinggroup/WindTurbineFarm-%s' % (region_name, account_id, project_id), \n", " ],\n", " document=json.dumps({\n", " 'type': 'new_model',\n", " 'model_version': model_version,\n", " 'model_name': model_name,\n", " 'model_package_bucket': bucket_name,\n", " 'model_package_key': \"%s/model/%s-%s.tar.gz\" % (prefix, model_name, model_version) \n", " }),\n", " targetSelection='SNAPSHOT'\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Alright! Now, the deployment process will start on the connected edge devices! \n", "You can start the Exercise #3: Run a simulated fleet of wind turbines and edge devices. Predict anomalies\n", " \n", " > [Exercise 03](03%20-%20Run%20Fleet.ipynb)\n" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }